ローカルファイルを参照してみる | Luigi Advent Calendar 2016 #04
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』4日目の内容となります。
今回はローカルファイルを参照していきたいと思います。
先日3日目はタスクパラメータを設定してみた でした。
Luigiは複雑データパイプラインを処理するツールになります。
当然データとしてローカルのファイルも扱うことができます。
今回はその一例としてファイルの中身を参照してみたいと思います。
下準備
参照するTSVファイルを用意しておきます。
今回はRedshiftのサンプルデータの一部をTSVとして抽出したものを用いました。
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING 2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE 3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE 4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY 5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD 6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE 7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE 8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING 9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE 10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOLD
ローカルファイルの参照
Luigiのタスク一つ一つはPythonのクラスで実装され、outputメソッドで最終的な出力の形式を、
runメソッドでそのタスク内での処理を、requiresメソッドでその依存関係を指定します。
また、ファイルの参照をするといった場合も依存関係で解決するため、今回の例ではファイルの参照を行うクラスを実装し、
それを依存関係として指定することで、当該のファイルを参照して処理をするといったことが行えることとなります。
まず、ファイルの参照を実装していきます。
上記に記載した通りファイルの参照はファイルの参照するためのタスクを作成し、outputメソッドでファイルの形式として指定する必要があります。
今回の例では以下のようなりました。
import luigi class TsvInput(luigi.Task): def output(self): return luigi.LocalTarget('customer.tsv')
上記のままではファイルの参照を指定しているだけですので、これを依存関係に組み込んで、 実際に値を参照・出力してみたいと思います。
依存関係の設定
タスクの依存関係を指定する際は、上記でも書いた通りrequiresメソッドを用います。
依存関係で用いたいタスクを戻り値として使うことで、依存関係として参照されることになります。
上記で書いたTsvInputを依存関係として使う場合は以下のようになります。
class TsvColumnShow(luigi.Task): def requires(self): return TsvInput()
依存元のタスクでoutputメソッドが定義されていた際は、
タスク内でinputメソッドを呼ぶことでoutputメソッドで指定した内容を参照することができます。
今回のファイル参照の場合、依存元でファイルの読み込みを実施し、outputメソッド内でluigi.LocalTargetメソッドで ファイルの参照を実施しているため、その後のTsvColumShowのrunメソッド内のinput()メソッドで参照できています。
class TsvColumnShow(luigi.Task): def requires(self): return TsvInput() def run(self): with self.input().open('r') as input: for line in input: print line
最後に、コード全体と、動かした際の挙動を載せておきます。
配置したtsvファイルの内容が表示されていることがわかると思います。
import luigi class TsvInput(luigi.Task): def output(self): return luigi.LocalTarget('customer.tsv') class TsvColumnShow:q(luigi.Task): def requires(self): return TsvInput() def run(self): with self.input().open('r') as input: for line in input: print line if __name__ == '__main__': luigi.run()
実行結果
$ ls UseTsv.py customer.tsv $ python ./UseTsv.py --local-scheduler TsvColumShow DEBUG: Checking if TsvColumShow() is complete /Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py:295: UserWarning: Task TsvColumShow() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if TsvInput() is complete INFO: Informed scheduler that task TsvColumShow__99914b932b has status PENDING INFO: Informed scheduler that task TsvInput__99914b932b has status DONE INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 98445] Worker Worker(salt=002911966, workers=1, host=HL00088, username=kajiwarayutaka, pid=98445) running TsvColumShow() 1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING 2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE 3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE 4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY 5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD 6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE 7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE 8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING 9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE 10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOLD INFO: [pid 98445] Worker Worker(salt=002911966, workers=1, host=HL00088, username=kajiwarayutaka, pid=98445) done TsvColumShow() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task TsvColumShow__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=002911966, workers=1, host=HL00088, username=kajiwarayutaka, pid=98445) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 1 present dependencies were encountered: - 1 TsvInput() * 1 ran successfully: - 1 TsvColumShow() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
まとめ
まずはデータ制御のローカルファイルの参照を行なってみました。
今回は単一のファイルを扱ったので、次回は複数のファイルを扱ってみたいと思います。